Skip to content

[HUDI-6466] Fix spark insert overwrite partitioned table with dynamic partition#9113

Merged
danny0405 merged 4 commits intoapache:masterfrom
flashJd:Fix_spark_overwrite
Aug 3, 2023
Merged

[HUDI-6466] Fix spark insert overwrite partitioned table with dynamic partition#9113
danny0405 merged 4 commits intoapache:masterfrom
flashJd:Fix_spark_overwrite

Conversation

@flashJd
Copy link
Copy Markdown
Contributor

@flashJd flashJd commented Jul 3, 2023

Change Logs

When upgrade hudi from 0.12.2->0.13.1, I found spark's capcity of insert overwrite partitioned table with dynamic partition lost,
see #8283 (comment):
It will cause serious data problems if upgrade to 0.13.1, user will delete all data by mistake
As #7365 (comment) mentioned,
insert_overwrite_table will override entire table. while insert_overwrite_partition will overwrite only matching partitions.
Now we can only use static partition syntax to realize insert_overwrite_partition semantics.

Impact

  1. Keep insert overwrite semantics to be forward compatible(recover dynamic partition capcity)
  2. Realize insert_overwrite_table semantics with partitioned table as [HUDI-5317] Fix insert overwrite table for partitioned table #7365 (comment) mentioned using a config(set hoodie.datasource.write.operation = insert_overwrite_table)
  3. Use append mode to not delete whole table data as [HUDI-5884] Support bulk_insert for insert_overwrite and insert_overwrite_table #8076 (comment) mentioned

Risk level (write none, low medium or high below)

Media

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@danny0405 danny0405 added release-0.14.0 priority:blocker Production down; release blocker issue:data-consistency Data consistency issues (duplicates/phantoms) labels Jul 4, 2023
@danny0405
Copy link
Copy Markdown
Contributor

Seems a huge behavior change, may not have time for the fix for release 0.14.0, cc @boneanxs can you help for the review here?

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 4, 2023

} else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) {

Modify of this piece of code cause many test failing due to the delete old base path logic, so just revert it and handle it in another pr

@flashJd flashJd force-pushed the Fix_spark_overwrite branch from c6127a0 to ea59dc5 Compare July 4, 2023 03:59
@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 4, 2023

@hudi-bot run azure

@flashJd flashJd force-pushed the Fix_spark_overwrite branch from ea59dc5 to 573cf77 Compare July 4, 2023 07:15
@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 4, 2023

rebase master to fix pipeline
@hudi-bot run azure

@flashJd flashJd force-pushed the Fix_spark_overwrite branch from 573cf77 to 72e9fc3 Compare July 4, 2023 07:50
@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 4, 2023

@hudi-bot run azure

@boneanxs
Copy link
Copy Markdown
Contributor

boneanxs commented Jul 4, 2023

@flashJd I noticed this issue before. Yes, this is a behavior change for INSERT_OVERWRITE without partition columns after #7365, but I think it's the right modification? if users don't specify partition columns, we'll consider it wants to overwrite all table?

Spark sql also does the same way. i.e. insert overwrite table_name values( #specify partition values) will overwrite whole table.

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 5, 2023

@flashJd I noticed this issue before. Yes, this is a behavior change for INSERT_OVERWRITE without partition columns after #7365, but I think it's the right modification? if users don't specify partition columns, we'll consider it wants to overwrite all table?

Spark sql also does the same way. i.e. insert overwrite table_name values( #specify partition values) will overwrite whole table.

  1. Capcity of insert overwrite partitioned table with dynamic partition lost, we can only use the grammer insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100 now.
  2. Insert overwrite semantics not forward compatible

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 5, 2023

As we need the capacity to insert overwrite the whole partitioned table, why not use the config to enable it and make semantics forward compatible, meanwhile not lose the dynamic partition capacity

@boneanxs
Copy link
Copy Markdown
Contributor

boneanxs commented Jul 5, 2023

You can still use dynamic partition, in this way:

insert overwrite hudi_cow_pt_tbl partition(dt, hh) select 13, 'a13', 1100,  '2021-12-09', '12'

the main point is that do we consider insert overwrite hudi_cow_pt_tbl select 13, 'a13', 1100, '2021-12-09', '12' is a dynamic partition writing? I think @leesf 's view makes sense, #7365 (comment)

@nsivabalan hi, here are my two cents: insert overwrite xxx values(xx,xxx) has very clear semantics, it means overwrite the entire table, insert overwrite xx partition(xx) values(xx,xxx) means insert overwrite partitions, but hudi handles overwrite partitions for overwrite table, which is a definite bug and i do not think we need to introduce a new operation for it.

Also, this change can also keep the consistent behavior with spark sql, insert overwrite hive_pt_tbl select 13, 'a13', 1100, '2021-12-09', '12' will overwrite the whole table by default.

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 5, 2023

I'm confused why insert overwrite hudi_cow_pt_tbl select 13, 'a13', 1100, '2021-12-09', '12' is a not dynamic partition writing?
the semantics should can be controled by config.
we should clarify the conception static partition and dymanic partition
1)https://iceberg.apache.org/docs/latest/spark-writes/#insert-overwrite
iceberg dynamic and static partiton overwrite semantics
2)https://docs.databricks.com/delta/selective-overwrite.html#language-sql
delta lake dynamic partiton overwrite semantics
3)https://hudi.apache.org/cn/docs/quick-start-guide/#insert-overwrite
-- insert overwrite partitioned table with dynamic partition
insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09', '10';

-- insert overwrite partitioned table with static partition
insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;

@nsivabalan @yihua @XuQianJin-Stars @KnightChess

@boneanxs
Copy link
Copy Markdown
Contributor

boneanxs commented Jul 5, 2023

How about follow the spark behavior? We should respect the spark configure: spark.sql.sources.partitionOverwriteMode, if it's Static, overwrite the whole table, otherwise if dynamic, overwrite the changed partitions.

It appeals this is also how iceberg works.

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 5, 2023

How about follow the spark behavior? We should respect the spark configure: spark.sql.sources.partitionOverwriteMode, if it's Static, overwrite the whole table, otherwise if dynamic, overwrite the changed partitions.

It appeals this is also how iceberg works.

Agreed, I will check the logic to respect the spark configure

@KnightChess
Copy link
Copy Markdown
Contributor

KnightChess commented Jul 6, 2023

  • Inside us, in order to compatible the user's habit of using hive, we use hive.exec.dynamic.partition.mode to be compatible with it.
  • In spark, this behavior will be different in diffrent datasource. For example, if write to hive table, spark.sql.sources.partitionOverwriteMode is not effective, in InsertIntoHiveTable, it only effect by hive.exec.dynamic.xxx, which has it's own controller action.
  • So, if we want to use spark behavior here, I think the behavior should belong to the hudi itself, not engine. and all engines are implemented in this way by use its own parameters. what about you think? @flashJd @boneanxs

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 6, 2023

Do you mean use hudi's own parameter to control the dynamic partition behavior

@danny0405
Copy link
Copy Markdown
Contributor

Do you mean use hudi's own parameter to control the dynamic partition behavior

Wondering how Iceberg handles the syncing of different engine behaviors.

@KnightChess
Copy link
Copy Markdown
Contributor

Do you mean use hudi's own parameter to control the dynamic partition behavior

yeah, but from the engine, hudi is just a format, and the engine control deletion behavior is also reasonable.

@KnightChess
Copy link
Copy Markdown
Contributor

Wondering how Iceberg handles the syncing of different engine behaviors.

I only know that spark is controlled by the computing engine

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 6, 2023

How about follow the spark behavior? We should respect the spark configure: spark.sql.sources.partitionOverwriteMode, if it's Static, overwrite the whole table, otherwise if dynamic, overwrite the changed partitions.
It appeals this is also how iceberg works.

Agreed, I will check the logic to respect the spark configure

@boneanxs @KnightChess @danny0405

  1. I try to respect the spark configure spark.sql.sources.partitionOverwriteMode, but found it's supported in datasource v2 as HoodieInternalV2Table only support V1_BATCH_WRITE TableCapability thus can't extend interface SupportsDynamicOverwrite
  2. For iceberg, I found it repsect the spark config as implement the interface SupportsDynamicOverwrite, but it also set
    it's own configure to control the static/dynamic overwrite semantics,
    https://github.com/apache/iceberg/blob/1f1ec4be478feae79b04bcea3e9a8556d8076054/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java#L106
  3. As the v2 BATCH_WRITE not supported now, we can first use hooide config hoodie.datasource.write.operation = insert_overwrite_table/insert_overwrite to implement the static/dynamic overwrite semantics, then respect spark configure when v2 write support.
    what do you think?

@flashJd flashJd force-pushed the Fix_spark_overwrite branch from a811bc5 to 034e827 Compare July 6, 2023 12:34
@boneanxs
Copy link
Copy Markdown
Contributor

boneanxs commented Jul 7, 2023

I looked into how iceberg and delta handle this,

Iceberg

like @flashJd mentioned before:

  1. Write option overwrite-mode, like HUDI's HoodieOptionConfig
  2. Controlled by spark.sql.sources.partitionOverwriteMode, by implementing interface SupportsDynamicOverwrite, spark inside will handles this.

Delta

Roughly the same with Iceberg, but delta has an extra delta-configure to control this, so

  1. spark.databricks.delta.dynamicPartitionOverwrite.enabled, if true dynamic-overwrite, otherwise
  2. respect the write option partitionOverwriteMode or spark sql configure spark.sql.sources.partitionOverwriteMode, which is dynamic or static

I tend to favor the iceberg's behavior, since it's more clear and enough for users to control this, and the addition of a new configuration would increase the learning curve for users, especially they already are familiar with spark.sql.sources.partitionOverwriteMode.

I try to respect the spark configure spark.sql.sources.partitionOverwriteMode, but found it's supported in datasource v2 as HoodieInternalV2Table only support V1_BATCH_WRITE TableCapability thus can't extend interface SupportsDynamicOverwrite

SupportsDynamicOverwrite is just a WriteBuilder interface, like SupportsOverwrite, you can implement it in HoodieV1WriteBuilder, i.e,

private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
                                   hoodieCatalogTable: HoodieCatalogTable,
                                   spark: SparkSession)
  extends SupportsTruncate with SupportsDynamicOverwrite with SupportsOverwrite with ProvidesHoodieConfig {
// ...
  override def overwriteDynamicPartitions(): WriteBuilder = {
    overwriteDynamic = true
    this
  }
}

then you need to update the method HoodieV1WriteBuilder#build to respect overwriteDynamic.

For V1 write path InsertIntoHoodieTableCommand, it's much simple, we can read sparkconf to get the value of spark.sql.sources.partitionOverwriteMode and adapt the process accordingly.

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 7, 2023

I looked into how iceberg and delta handle this,

Iceberg

like @flashJd mentioned before:

  1. Write option overwrite-mode, like HUDI's HoodieOptionConfig
  2. Controlled by spark.sql.sources.partitionOverwriteMode, by implementing interface SupportsDynamicOverwrite, spark inside will handles this.

Delta

Roughly the same with Iceberg, but delta has an extra delta-configure to control this, so

  1. spark.databricks.delta.dynamicPartitionOverwrite.enabled, if true dynamic-overwrite, otherwise
  2. respect the write option partitionOverwriteMode or spark sql configure spark.sql.sources.partitionOverwriteMode, which is dynamic or static

I tend to favor the iceberg's behavior, since it's more clear and enough for users to control this, and the addition of a new configuration would increase the learning curve for users, especially they already are familiar with spark.sql.sources.partitionOverwriteMode.

I try to respect the spark configure spark.sql.sources.partitionOverwriteMode, but found it's supported in datasource v2 as HoodieInternalV2Table only support V1_BATCH_WRITE TableCapability thus can't extend interface SupportsDynamicOverwrite

SupportsDynamicOverwrite is just a WriteBuilder interface, like SupportsOverwrite, you can implement it in HoodieV1WriteBuilder, i.e,

private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
                                   hoodieCatalogTable: HoodieCatalogTable,
                                   spark: SparkSession)
  extends SupportsTruncate with SupportsDynamicOverwrite with SupportsOverwrite with ProvidesHoodieConfig {
// ...
  override def overwriteDynamicPartitions(): WriteBuilder = {
    overwriteDynamic = true
    this
  }
}

then you need to update the method HoodieV1WriteBuilder#build to respect overwriteDynamic.

For V1 write path InsertIntoHoodieTableCommand, it's much simple, we can read sparkconf to get the value of spark.sql.sources.partitionOverwriteMode and adapt the process accordingly.

1)SupportsDynamicOverwrite need the table capabilities BATCH_WRITE, now not support
2)V1_BATCH_WRITE not support DynamicOverwrite
https://github.com/apache/spark/blob/d4277b8e78347fd4e3163c6218edc4675ebb6db2/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java#L101C33-L101C33
3)We should implement v2 BATCH_WRITE in datasource V2 feature as a whole not here
so why not use an extra hudi-configure to control it like iceberg&delta

@boneanxs
Copy link
Copy Markdown
Contributor

boneanxs commented Jul 7, 2023

Add the ability OVERWRITE_DYNAMIC should be enough? Not sure what implement v2 BATCH_WRITE in datasource V2 feature as a whole means, this is my wechat: rexboom_an, can add me so we can fast align this.

why not use an extra hudi-configure to control it like iceberg&delta

I mean we also need to add an option in HoodieOptionConfig to control this, but besides, can respect spark.sql.sources.partitionOverwriteMode since spark users are familiar with it.

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 7, 2023

Add the ability OVERWRITE_DYNAMIC should be enough? Not sure what implement v2 BATCH_WRITE in datasource V2 feature as a whole means, this is my wechat: rexboom_an, can add me so we can fast align this.

why not use an extra hudi-configure to control it like iceberg&delta

I mean we also need to add an option in HoodieOptionConfig to control this, but besides, can respect spark.sql.sources.partitionOverwriteMode since spark users are familiar with it.

Make a summary as we discussed, when come accorss an insert overwrite :

  1. First respect hoodie.datasource.write.operation, if it is configured, then use it to insert overwrite partition/table
  2. If hoodie.datasource.write.operation not configured, use a new hoodie config to control the behavor like iceberg/deltalake
  3. if the new config is not set, then respect spark's spark.sql.sources.partitionOverwriteMode and it's default value is static
    The behavor is not forward compatible but aligned with spark

@danny0405
Copy link
Copy Markdown
Contributor

@flashJd Since we already have a conclusion, do you have intreast to fix it for release 0.14.0 ?

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 28, 2023

@flashJd Since we already have a conclusion, do you have intreast to fix it for release 0.14.0 ?

Yeah, sorry to late fix it, i"ll fix it today as we have discussed

@flashJd flashJd force-pushed the Fix_spark_overwrite branch from 034e827 to 5c02442 Compare July 28, 2023 12:36
@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Jul 28, 2023

As the behavor is aligned with spark but not forward compatible, we should notify this change in the release of 0.14.0

@danny0405
Copy link
Copy Markdown
Contributor

@boneanxs Can you help to take a look again ~

Copy link
Copy Markdown
Contributor

@boneanxs boneanxs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job! Overall LGTM from my side, just 2 minor comments to refine codes

def deduceIsOverwriteTable(sparkSession: SparkSession,
catalogTable: HoodieCatalogTable,
partitionSpec: Map[String, Option[String]]): Boolean = {
val operation = sparkSession.sqlContext.getConf(OPERATION.key, "")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use combineOptions() here to keep the sources of HUDI configure consistent with others.

true
} else if (operation.nonEmpty && operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL)) {
false
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Scala match expression can be more clear?

    operation match {
      case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL =>
        true
      case INSERT_OPERATION_OPT_VAL =>
        false
      case _ =>
        // NonPartitioned table always insert overwrite whole table
        if (catalogTable.partitionFields.isEmpty) {
          true
        } else {
          // Insert overwrite partitioned table with PARTITION clause will always insert overwrite the specific partition
          if (partitionSpec.nonEmpty) {
            false
          } else {
            // If hoodie.datasource.overwrite.mode configured, respect it
            val hoodieOverwriteMode = sparkSession.sqlContext.getConf(OVERWRITE_MODE.key,
              sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
            
            hoodieOverwriteMode match {
              case "STATIC" =>
                true
              case "DYNAMIC" =>
                false
              case _ =>
                throw new IllegalArgumentException("xxx")
            }
          }
        }
    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, I'll refine the code later

false
} else {
// If hoodie.datasource.overwrite.mode configured, respect it, otherwise respect spark.sql.sources.partitionOverwriteMode
val hoodieOverwriteMode = sparkSession.sqlContext.getConf(OVERWRITE_MODE.key,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be:
val hoodieOverwriteMode = combinedOpts.getOrElse(OVERWRITE_MODE.key, sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()

@flashJd flashJd force-pushed the Fix_spark_overwrite branch from 1f8dd85 to a14a553 Compare August 2, 2023 01:53
Copy link
Copy Markdown
Contributor

@boneanxs boneanxs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Aug 2, 2023

@hudi-bot run azure

@hudi-bot
Copy link
Copy Markdown
Collaborator

hudi-bot commented Aug 2, 2023

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Copy Markdown
Contributor

@danny0405 danny0405 merged commit d67455a into apache:master Aug 3, 2023
@xuzifu666
Copy link
Copy Markdown
Member

@danny0405 @flashJd our version contains this pr,but insert overwrite table with partition also trigger overwrite table not overwrite partition

@flashJd
Copy link
Copy Markdown
Contributor Author

flashJd commented Nov 3, 2023

@danny0405 @flashJd our version contains this pr,but insert overwrite table with partition also trigger overwrite table not overwrite partition

Default behavior is overwrite table, use config to switch to overwrite partition, see tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

issue:data-consistency Data consistency issues (duplicates/phantoms) priority:blocker Production down; release blocker release-0.14.0

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

6 participants